package com.huan.flink;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;

/**
 * 基于 DataStream Api实现 word count
 * 流处理： 数据来一条，处理一条。 解释： 获取到一行数据，依次执行下方的 flatMap、keyBy、sum、print方法。
 *
 * @author huan.fu
 * @date 2023/9/13 - 23:31
 */
public class LocalWebUiApplication {

    public static void main(String[] args) throws Exception {
        // 获取执行环境
        Configuration configuration = new Configuration();
        // 设置本地 web ui 的端口
        configuration.setInteger("rest.port", 9999);
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);

        // 设置为 流 的方式执行，也可以设置为 批 的方式执行， 也可通过条的时候通过 -Dexecution.runtime-mode=BATCH进行动态执行
        environment.setRuntimeMode(RuntimeExecutionMode.STREAMING);

        // 2、从文件中读取数据
        String filePath = "/Users/huan/code/IdeaProjects/me/spring-cloud-parent/flink/flink-local-test-ui/src/main/resources/word.txt";
        environment.readTextFile(filePath, StandardCharsets.UTF_8.name())
                // 3、将读取到一行数据进行切割、转换
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                        // 每一行以空格进行分隔
                        String[] words = line.split(" ");
                        for (String word : words) {
                            // 使用 collector 向下游发送数据
                            collector.collect(Tuple2.of(word, 1));

                            // 本地测试，程序睡眠 50s
                            TimeUnit.SECONDS.sleep(50);
                        }
                    }
                })
                // 4、根据 word 进行分组 0表示的是 Tuple2对象中的第一个字段的值
                .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
                        return tuple2.f0;
                    }
                })
                // 5、各分组内进行聚合 1表示的是 Tuple2对象中的第二个字段的值
                .sum(1)
                // 6、输出
                .print();

        /**
         * environment.execute[Async]() 执行多次会产生多次Job
         */
        // environment.executeAsync("异步执行");
        environment.execute("LocalWebUiApplication");
    }

}
